package com.carol.bigdata.task.feature

import com.carol.bigdata.Config
import com.carol.bigdata.constant.KVConstant
import com.carol.bigdata.utils.HBaseUtil
import com.carol.bigdata.utils.{Flag, TimeUtil}
import com.carol.bigdata.Config.hbaseParams
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import com.carol.bigdata.utils.FeatureUtil.{calFeat, calTotal, changeProfileRDD, getDataRowKey}
import com.carol.bigdata.utils.RddReader.readHiveRDD


object CalPayTag {

    // 基本字段
    val keyColumns: List[String] = KVConstant.keyColumns
    val keyLen: Int = keyColumns.length
    val cf: String = KVConstant.cf
    val propertiesField: String = "properties"

    // 数据源
    // 支付数据表
    val orderTable: String = KVConstant.actionTable
    val orderColumns: List[String] = keyColumns :+ propertiesField
    val orderPattern: String = KVConstant.orderPattern
    val orderPropIdx: Int = orderColumns.indexOf(propertiesField)
    val orderPropColumns: List[String] = List("order_amount", "order_status", "order_type")
    val orderStatusIdx: Int = orderPropColumns.indexOf("order_status") + keyLen

    // 游戏日更中间表
    // 用户数据统计结果表
    val userProfileTable: String = KVConstant.userProfileTable
    val keyCF: String = KVConstant.keyCF
    val keyCols: List[String] = KVConstant.dateKeyColumns
    val writeKeyCols: List[String] = KVConstant.writeKeyCols
    val event: String = "profile"

    // 用户消费特征字段 pay_tag
    val payMiddleCf: String = "pay_tag"
    val payFeat: List[(String, String)] = List(
        ("$pay_money", "split_map"),
        ("$pay_count", "map"),
        ("$pay_amount", "int"),
        ("$pay_frequency", "int")
    )


    def calPayInfo(statDay: String,
                   payUsers: RDD[List[String]],
                   featList: List[(String, String)]): RDD[(List[String], List[String])] = {
        // payUsers: List(gameId,accountId,amount,orderStatus,orderType)
        //println(s"payUsers: ${payUsers.count()}")
        //payUsers.take(5).foreach(println)

        val payInfo = payUsers.filter(_ (orderStatusIdx) == "1")
            // => List(gameId,accountId), List(orderType:amount,orderType,amount,1)
            .map(x => {
                val key = x.take(keyLen)
                val List(amount, orderStatus, orderType) = x.takeRight(3)
                val tmpAmount = List(orderType, amount).mkString(":")
                (key, List(tmpAmount, orderType, amount, "1").map(List(_)))
            })
            .reduceByKey((a, b) => a.zip(b).map(i => i._1 ++ i._2))
            .map(x => {
                val res = calFeat(x._2, featList)
                (statDay +: x._1, res)
            })
        println(s"payInfo: ${payInfo.count()}")
        println(s"feat: ${featList}")
        payInfo.take(5).foreach(println)
        payInfo
    }


    def run(hbaseParams: Map[String, String],
            spark: SparkSession,
            statDay: String,
            game: String): Unit = {
        println("计算付费特征")
        val yesterday: String = TimeUtil.getTimeDeltaDay(statDay, -1)
        val yesterdayRowKey = for (date <- List(yesterday)) yield getDataRowKey(game, event,  date)
//        val orderChooseRowKey = for (date <- List(statDay)) yield getDataRowKey(game, orderPattern,  date)

        // 1、读取hbase数据
        // List(gameId,accountId,properties) => List(gameId,accountId,amount,orderStatus,orderType)
        //val payUsers = HBaseUtil.readAsList(hbaseParams, spark, orderTable, cf, orderColumns, orderChooseRowKey,
        //    timeColumn = "time", timeMode = "day", filterMode = "PATTERN AND TIME")
        //    .map(parseProperties(_, orderPropIdx, orderPropColumns))
        val payUsers = readHiveRDD(spark, orderTable, statDay, orderPattern,  game, keyColumns, orderPropColumns)
        println("payUsers:", payUsers.count())
        payUsers.take(5).foreach(println)
        // 2、计算
        val payRDD = calPayInfo(statDay, payUsers, payFeat)
        // List(gameId,event,time,accountId), List(pay_money,pay_count,pay_amount,pay_frequency)
        val writePayRDD = changeProfileRDD(payRDD)

        // 3、写入hbase中间表
        val payFeatColumns = payFeat.map(_._1)
        HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writePayRDD, keyCF, writeKeyCols, payFeatColumns,
            List.fill(payFeatColumns.length)(payMiddleCf))

        // 4、计算total
        val List(totalIntRDD, totalMapRDD) = calTotal(hbaseParams, spark, statDay, payRDD, userProfileTable,
            List(keyCF, payMiddleCf), keyCols, payFeat, yesterdayRowKey)
        val writeTotalIntRDD = changeProfileRDD(totalIntRDD)
        val writeTotalMapRDD = changeProfileRDD(totalMapRDD)

        // 5、写入total
        val totalIntColumns = payFeat.filter(_._2.toLowerCase == "int").map(_._1 + "_total")
        val totalMapColumns = payFeat.filter(_._2.toLowerCase == "map").map(_._1 + "_total")
        HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeTotalIntRDD, keyCF, writeKeyCols, totalIntColumns,
            List.fill(totalIntColumns.length)(payMiddleCf))
        HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeTotalMapRDD, keyCF, writeKeyCols, totalMapColumns,
            List.fill(totalMapColumns.length)(payMiddleCf))
    }


    def main(args: Array[String]): Unit = {
        Flag.Parse(args)
        val spark = SparkSession.builder()
            .config(conf = Config.sparkConf)
            .enableHiveSupport()
            .getOrCreate()
        val sc = spark.sparkContext
        sc.setLogLevel("ERROR")
        //val ipPath: String = sc.broadcast(Config.ipPath).value
        val gameList = List("200")
        val timezoneList = List("Asia/Bangkok")
        for (game <- gameList) {
            for (timezone <- timezoneList) {
                for (day <- 1 to 1) {
                    val statday = "2021-08-%02d".format(day)
                    TimeUtil.timer(run(hbaseParams, spark, statday, game))
                }
            }
        }

        spark.stop()
    }
}
